package com.bieber.server.notifications;

import com.bieber.common.Constants;
import com.bieber.common.Node;
import com.bieber.registry.Notification;
import com.bieber.server.Sender;
import com.bieber.server.Transporter;
import com.bieber.server.config.ServerConfig;
import com.bieber.server.strategy.DefaultStrategy;
import com.bieber.server.strategy.SenderStrategy;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.*;

/**
 * Created by bieber on 2015/8/21.
 * 用来监听注册中心的其他节点，用来将本地文件同步给他们
 */
public class SenderNotification implements Notification{
    
    private ServerConfig config;
    
    private SenderStrategy strategy;

    private volatile List<Node> lastNodes;

    private static final ScheduledExecutorService SENDER_EXECUTOR = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());

    public SenderNotification(ServerConfig config) {
        this.config = config;
        this.strategy = new DefaultStrategy(config);
    }

    @Override
    public void notify(List<String> children) {
        if(!Transporter.isRunning()){
            SENDER_EXECUTOR.shutdown();
            return;
        }
        Constants.COMMON_LOGGER.info("node change to [{}]",children);
        synchronized (this){
            List<Node> nodes = this.strategy.convert(children);
            List<Node> filterResult = new ArrayList<Node>();
            Constants.COMMON_LOGGER.info("get remote node [{}]",nodes);
            checkNodeChange(nodes);
            for(Node node:nodes){
                if(Transporter.getSender(node)==null){
                    filterResult.add(node);
                }
            }
            lastNodes=new ArrayList<Node>(filterResult);
            if(filterResult.size()>0){
                Sender sender = new Sender(config,filterResult);
                for(Node node:filterResult){
                    Transporter.registerSender(node, sender);
                }
                SENDER_EXECUTOR.scheduleWithFixedDelay(sender, config.getScanInitialDelay(), config.getScanFileDelay(), TimeUnit.MILLISECONDS);
            }
        }
    }

    private void checkNodeChange(List<Node> currentNodes){
        if(lastNodes==null){
            return ;
        }
        if(currentNodes==null){
            currentNodes=new ArrayList<Node>();
        }
        Iterator<Node> nodeIterator = lastNodes.iterator();
        while(nodeIterator.hasNext()){
            Node node = nodeIterator.next();
            if(!currentNodes.contains(node)){
                Sender reader =  Transporter.unRegisterSender(node);
                if(null!=reader){
                    Constants.COMMON_LOGGER.info("unregister node [{}]",node);
                    reader.unregisterNode(node);
                }
            }
        }
    }
}
